/*  
 * i-OSGi - Tunable Bundle Isolation for OSGi
 * Copyright (C) 2011  Sven Schulz
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
package org.iosgi.servicebus;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.felix.ipojo.annotations.Component;
import org.apache.felix.ipojo.annotations.Invalidate;
import org.apache.felix.ipojo.annotations.Property;
import org.apache.felix.ipojo.annotations.Provides;
import org.apache.felix.ipojo.annotations.Requires;
import org.apache.felix.ipojo.annotations.Validate;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.iosgi.util.Network;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceEvent;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.hooks.service.EventHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ch.ethz.iks.r_osgi.RemoteOSGiException;
import ch.ethz.iks.r_osgi.RemoteOSGiService;
import ch.ethz.iks.r_osgi.RemoteServiceReference;
import ch.ethz.iks.r_osgi.URI;

/**
 * @author Sven Schulz
 */
@Component(immediate = true)
@Provides(specifications = EventHook.class)
public class ServiceBus implements EventHook, Watcher {

	private static final Logger LOGGER = LoggerFactory
			.getLogger(ServiceBus.class);
	private static final String ROOT_PATH = "/iosgi/services";

	@Property(name = "serverPort", value = "2181")
	private int serverPort;

	@Property(name = "serverAddress", value = "")
	private String serverAddress;

	@Requires
	private RemoteOSGiService remoteOSGiSvc;

	@Requires
	private Network network;

	private ZooKeeper zk;
	private InetAddress localAddr;
	private int port;
	private Set<ServiceDescriptor> active;
	private BundleContext context;
	private Map<ServiceDescriptor, RemoteServiceReference> refByDesc;

	ServiceBus(BundleContext context) {
		this.context = context;
		active = new HashSet<ServiceDescriptor>();
		refByDesc = new HashMap<ServiceDescriptor, RemoteServiceReference>();
	}

	private void create(String path) throws KeeperException,
			InterruptedException {
		String[] nodes = path.split("/");
		StringBuilder currPath = new StringBuilder();
		for (String n : nodes) {
			if (n.isEmpty())
				continue;
			currPath.append("/").append(n);
			try {
				if (zk.exists(currPath.toString(), false) != null) {
					continue;
				}
				zk.create(currPath.toString(), new byte[0],
						Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			} catch (NodeExistsException ke) {
				/* OK, that's fine! */
			}
		}
	}

	@Validate
	void activate() throws IOException, KeeperException, InterruptedException,
			InvalidSyntaxException {
		localAddr = network.getAddress();
		port = remoteOSGiSvc.getListeningPort("r-osgi");
		String zkAddr = (serverAddress.isEmpty() ? localAddr.getHostAddress()
				: serverAddress);
		LOGGER.debug("connecting to zookeeper server ({}:{})", zkAddr,
				serverPort);
		zk = new ZooKeeper(zkAddr + ":" + serverPort, 3000, this);
		create(ROOT_PATH);
		update();
		ServiceReference[] refs = context.getAllServiceReferences(null, null);
		for (ServiceReference ref : refs) {
			ServiceEvent evt = new ServiceEvent(ServiceEvent.REGISTERED, ref);
			process(evt);
		}
	}

	@Invalidate
	void deactivate() throws InterruptedException {
		zk.close();
	}

	@Override
	public void event(ServiceEvent event,
			@SuppressWarnings("rawtypes") Collection contexts) {
		process(event);
	}

	private void process(final ServiceEvent event) {
		ServiceReference ref = event.getServiceReference();
		if (ref.getProperty(RemoteOSGiService.R_OSGi_REGISTRATION) == null) {
			return;
		}
		Long sid = (Long) ref.getProperty(Constants.SERVICE_ID);
		ServiceDescriptor desc = new ServiceDescriptor(localAddr, port, sid);
		String path = ROOT_PATH + "/" + desc;
		switch (event.getType()) {
		case ServiceEvent.REGISTERED: {
			try {
				zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE,
						CreateMode.EPHEMERAL);
			} catch (Exception e) {
				LOGGER.error("service node creation failed (Path: " + path
						+ ")", e);
			}
			break;
		}
		case ServiceEvent.UNREGISTERING: {
			try {
				zk.delete(path, -1);
			} catch (Exception e) {
				LOGGER.error("service node deletion failed (Path: " + path
						+ ")", e);
			}
			break;
		}
		default: {
			// TODO MODIFIED
		}
		}
	}

	private URI getAddress(ServiceDescriptor desc, boolean service) {
		StringBuilder b = new StringBuilder();
		b.append("r-osgi://").append(desc.getHost().getHostAddress())
				.append(':').append(desc.getPort());
		if (service) {
			b.append('#').append(desc.getServiceId());
		}
		return URI.create(b.toString());
	}

	private void connect(final ServiceDescriptor desc)
			throws RemoteOSGiException, IOException {
		URI uri = this.getAddress(desc, false);
		// ChannelEndpointManager cem = remoteOSGiSvc.getEndpointManager(uri);
		// if (cem == null) {
		remoteOSGiSvc.connect(uri);
		// }
	}

	private boolean isLocal(final ServiceDescriptor desc) {
		return desc.getHost().equals(this.localAddr)
				&& desc.getPort() == this.port;
	}

	public synchronized void update() {
		Stat stat = new Stat();
		List<String> paths = null;
		try {
			paths = zk.getChildren(ROOT_PATH, this, stat);
		} catch (Exception e) {
			LOGGER.error("fetching service node list failed", e);
			return;
		}
		Set<ServiceDescriptor> stale = new HashSet<ServiceDescriptor>(active);
		for (String p : paths) {
			ServiceDescriptor desc = new ServiceDescriptor(p);
			if (isLocal(desc)) {
				continue;
			}
			if (!active.contains(desc)) {
				LOGGER.debug("connecting to {}", desc);
				try {
					connect(desc);
				} catch (Exception e) {
					LOGGER.error("failed to establish connection (Service-Descriptor: "
							+ desc + ")");
				}
				LOGGER.debug("getting remote service ({})", desc);
				URI suri = this.getAddress(desc, true);
				RemoteServiceReference rref = null;
				int retries = 5;
				while (rref == null) {
					rref = remoteOSGiSvc.getRemoteServiceReference(suri);
					if (rref == null) {
						LOGGER.debug(
								"remote service unavailable ({}, #retries: {})",
								desc, retries--);
						try {
							Thread.sleep(1000);
						} catch (InterruptedException ie) {
							// Ignore
						}
						continue;
					}
				}
				active.add(desc);
				Object obj = remoteOSGiSvc.getRemoteService(rref);
				obj.hashCode();
				refByDesc.put(desc, rref);
			} else {
				stale.remove(desc);
			}
		}
		for (ServiceDescriptor desc : stale) {
			LOGGER.debug("ungetting remote service ({})", desc);
			active.remove(desc);
			RemoteServiceReference rref = refByDesc.remove(desc);
			try {
				remoteOSGiSvc.ungetRemoteService(rref);
			} catch (Exception e) {
				/* Ignore */
			}
		}
	}

	@Override
	public void process(final WatchedEvent evt) {
		switch (evt.getType()) {
		case NodeChildrenChanged: {
			update();
			break;
		}
		}
	}

}